在昨天的學習中,我們深入理解了優化器框架的設計,特別是 OptimizerRule
體系如何通過固定點迭代來優化邏輯計劃。今天我們將探討查詢執行生命週期中的最後一個關鍵階段:物理計劃生成。
物理計劃生成是將經過優化的邏輯計劃轉換為可執行的物理計劃的過程。這個階段不僅要考慮邏輯操作的正確性,更要考慮實際執行時的效能、記憶體使用、並行度等物理層面的因素。理解這個轉換過程對於掌握 DataFusion 的執行機制至關重要。
PhysicalPlanner
是 DataFusion 中負責將邏輯計劃轉換為物理計劃的核心組件。它的設計遵循以下幾個重要原則:
#[async_trait]
pub trait PhysicalPlanner: Send + Sync {
/// 從邏輯計劃創建物理計劃
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>>;
/// 從邏輯表達式創建物理表達式
fn create_physical_expr(
&self,
expr: &Expr,
input_dfschema: &DFSchema,
session_state: &SessionState,
) -> Result<Arc<dyn PhysicalExpr>>;
}
create_physical_plan
方法負責整個轉換過程,而 create_physical_expr
則處理表達式的轉換。這種分層設計使得表達式轉換可以獨立進行,提高了代碼的模組化程度。
葉子節點是查詢樹的起點,它們直接與數據源交互:
LogicalPlan::TableScan(TableScan {
source,
projection,
filters,
fetch,
..
}) => {
let source = source_as_provider(source)?;
let filters = unnormalize_cols(filters.iter().cloned());
let opts = ScanArgs::default()
.with_projection(projection.as_deref())
.with_filters(Some(&filters_vec))
.with_limit(*fetch);
let res = source.scan_with_args(session_state, opts).await?;
Arc::clone(res.plan())
}
這裡的關鍵是 TableProvider
的 scan_with_args
方法,它會根據具體的數據源類型(Parquet、CSV、JSON等)創建相應的物理執行計劃。
LogicalPlan::Values(Values { values, schema }) => {
let exec_schema = schema.as_ref().to_owned().into();
let exprs = values
.iter()
.map(|row| {
row.iter()
.map(|expr| {
self.create_physical_expr(expr, schema, session_state)
})
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()
})
.collect::<Result<Vec<_>>>()?;
MemorySourceConfig::try_new_as_values(SchemaRef::new(exec_schema), exprs)?
}
Values 節點直接轉換為記憶體數據源,所有表達式都需要轉換為物理表達式。
LogicalPlan::Projection(Projection { input, expr, .. }) => self
.create_project_physical_exec(
session_state,
children.one()?,
input,
expr,
)?,
投影操作需要將邏輯表達式轉換為物理表達式,並創建相應的執行計劃。
LogicalPlan::Filter(Filter {
predicate, input, ..
}) => {
let physical_input = children.one()?;
let input_dfschema = input.schema();
let runtime_expr = self.create_physical_expr(predicate, input_dfschema, session_state)?;
let filter = match self.try_plan_async_exprs(/* ... */)? {
PlanAsyncExpr::Sync(PlannedExprResult::Expr(runtime_expr)) => {
FilterExec::try_new(Arc::clone(&runtime_expr[0]), physical_input)?
}
// 處理異步表達式的情況
// ...
};
let selectivity = session_state.config().options().optimizer.default_filter_selectivity;
Arc::new(filter.with_default_selectivity(selectivity)?)
}
過濾操作不僅要轉換謂詞表達式,還要考慮選擇性估計,這對後續的優化很重要。
LogicalPlan::Aggregate(Aggregate {
input,
group_expr,
aggr_expr,
..
}) => {
let input_exec = children.one()?;
let groups = self.create_grouping_physical_expr(/* ... */)?;
let agg_filter = aggr_expr.iter().map(|e| {
create_aggregate_expr_and_maybe_filter(/* ... */)
}).collect::<Result<Vec<_>>>()?;
// 第一階段:Partial 聚合
let initial_aggr = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
groups.clone(),
aggregates,
filters.clone(),
input_exec,
Arc::clone(&physical_input_schema),
)?);
// 第二階段:Final 聚合
let final_grouping_set = initial_aggr.group_expr().as_final();
Arc::new(AggregateExec::try_new(
next_partition_mode,
final_grouping_set,
updated_aggregates,
filters,
initial_aggr,
Arc::clone(&physical_input_schema),
)?)
}
聚合操作採用兩階段設計:先進行 Partial 聚合,再進行 Final 聚合。這種設計支援分散式執行和記憶體優化。
LogicalPlan::Join(Join {
left: original_left,
right: original_right,
on: keys,
filter,
join_type,
null_equality,
schema: join_schema,
..
}) => {
let [physical_left, physical_right] = children.two()?;
// 處理表達式連接鍵的情況
let has_expr_join_key = keys.iter().any(|(l, r)| {
!(matches!(l, Expr::Column(_)) && matches!(r, Expr::Column(_)))
});
if has_expr_join_key {
// 需要添加投影來計算連接鍵
// ...
}
// 根據統計信息選擇連接策略
let join_algorithm = self.choose_join_algorithm(/* ... */)?;
match join_algorithm {
JoinAlgorithm::Hash => {
HashJoinExec::try_new(/* ... */)
}
JoinAlgorithm::SortMerge => {
SortMergeJoinExec::try_new(/* ... */)
}
// ...
}
}
連接操作需要考慮多個因素:連接鍵的類型、數據大小、是否已排序等,來選擇最適合的連接算法。
在選擇執行策略時,PhysicalPlanner 會考慮以下因素:
// 根據配置選擇分區策略
let can_repartition = session_state.config().target_partitions() > 1
&& session_state.config().repartition_aggregations();
let next_partition_mode = if can_repartition {
AggregateMode::FinalPartitioned
} else {
AggregateMode::Final
};
分區策略直接影響查詢的並行執行能力,需要根據目標分區數和具體操作類型來決定。
TableProvider
是數據源的核心抽象,它定義了如何從各種數據源讀取數據:
#[async_trait]
pub trait TableProvider: Send + Sync {
fn schema(&self) -> SchemaRef;
fn table_type(&self) -> TableType;
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>>;
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>>;
}
每種類型都有其特定的優化策略和限制。
DataFusion 支援多種分區策略:
// 分區剪枝:只掃描相關的分區
let (partition_filters, filters): (Vec<_>, Vec<_>) =
filters.iter().cloned().partition(|filter| {
can_be_evaluated_for_partition_pruning(&table_partition_col_names, filter)
});
分區剪枝可以顯著減少需要掃描的數據量,提高查詢效能。
在轉換邏輯表達式到物理表達式時,要充分利用表達式的特性:
// 利用表達式的可交換性進行優化
if expr.is_commutative() {
// 可以重新排列操作順序以優化執行
}
物理計劃生成時要考慮記憶體使用:
// 根據記憶體限制選擇執行策略
let uses_bounded_memory = window_expr.iter().all(|e| e.uses_bounded_memory());
if uses_bounded_memory {
Arc::new(BoundedWindowAggExec::try_new(/* ... */)?)
} else {
Arc::new(WindowAggExec::try_new(/* ... */)?)
}
充分利用多核處理器:
// 根據目標分區數決定是否重新分區
let can_repartition = session_state.config().target_partitions() > 1
&& session_state.config().repartition_window_functions();
物理計劃生成是 DataFusion 查詢執行生命週期中的關鍵環節,它將抽象的邏輯操作轉換為具體的可執行計劃。通過理解這個過程,我們可以:
明天我們將深入探討 ExecutionPlan 體系架構,了解物理計劃的執行模型和核心算子的工作原理。